{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Using Redshift Data Lake Export\n",
    "\n",
    "Redshift Data Lake Export gives you the ability to unload the result of a Redshift query to your S3 data lake in Apache Parquet format. This enables you to save data transformation and enrichment you have done in Redshift into your S3 data lake in an open format.\n",
    "\n",
    "You can specify one or more partition columns so that unloaded data is automatically partitioned into folders in your Amazon S3 bucket. \n",
    "\n",
    "For example, you can choose to unload our customer reviews data and partition it by `product_category`. This enables your queries to take advantage of partition pruning and skip scanning non-relevant partitions, improving query performance and minimizing cost."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"img/redshift_unload_parquet.png\" width=\"60%\" align=\"left\">"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "import sagemaker\n",
    "\n",
    "# Get region \n",
    "session = boto3.session.Session()\n",
    "region_name = session.region_name\n",
    "\n",
    "# Get SageMaker session & default S3 bucket\n",
    "sagemaker_session = sagemaker.Session()\n",
    "bucket = sagemaker_session.default_bucket()\n",
    "\n",
    "redshift = boto3.client('redshift')\n",
    "secretsmanager = boto3.client('secretsmanager')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Set S3 prefixes\n",
    "parquet_prefix_unload = 'amazon-reviews-pds/parquet-from-redshift'\n",
    "\n",
    "# Set S3 paths for Parquet unload\n",
    "s3_path_parquet_unload = 's3://{}/{}'.format(bucket, parquet_prefix_unload)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Get Redshift credentials"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "\n",
    "secret = secretsmanager.get_secret_value(SecretId='dsoaws_redshift_login')\n",
    "cred = json.loads(secret['SecretString'])\n",
    "\n",
    "master_user_name = cred[0]['username']\n",
    "master_user_pw = cred[1]['password']"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Redshift configuration parameters"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "redshift_cluster_identifier = 'dsoaws'\n",
    "\n",
    "database_name_redshift = 'dsoaws'\n",
    "database_name_athena = 'dsoaws'\n",
    "\n",
    "redshift_port = '5439'\n",
    "\n",
    "schema_redshift = 'redshift'\n",
    "schema_athena = 'athena'\n",
    "\n",
    "table_name_tsv = 'amazon_reviews_tsv'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Please Wait for Cluster Status  `Available`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "\n",
    "response = redshift.describe_clusters(ClusterIdentifier=redshift_cluster_identifier)\n",
    "cluster_status = response['Clusters'][0]['ClusterStatus']\n",
    "print(cluster_status)\n",
    "\n",
    "while cluster_status != 'available':\n",
    "    time.sleep(10)\n",
    "    response = redshift.describe_clusters(ClusterIdentifier=redshift_cluster_identifier)\n",
    "    cluster_status = response['Clusters'][0]['ClusterStatus']\n",
    "    print(cluster_status)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Get Redshift endpoint address & IAM Role"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Set Redshift endpoint address & IAM Role\n",
    "response = redshift.describe_clusters(ClusterIdentifier=redshift_cluster_identifier)\n",
    "\n",
    "redshift_endpoint_address = response['Clusters'][0]['Endpoint']['Address']\n",
    "iam_role = response['Clusters'][0]['IamRoles'][0]['IamRoleArn']\n",
    "\n",
    "print(redshift_endpoint_address)\n",
    "print(iam_role)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create Redshift Connection"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import awswrangler as wr\n",
    "\n",
    "con_redshift = wr.data_api.redshift.connect(\n",
    "    cluster_id=redshift_cluster_identifier,\n",
    "    database=database_name_redshift,\n",
    "    db_user=master_user_name,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Unload Parquet Data From Redshift To S3"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Simply run the following SQL command to unload our 2014 and 2015 customer reviews data in Parquet file format into S3, partitioned by `product_category`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def unload_redshift_table(wr, con_redshift, table_name_prefix, start_year, end_year, s3_path, iam_role):\n",
    "    for year in range(start_year, end_year+1, 1):\n",
    "        current_table_name = table_name_prefix+'_'+str(year)\n",
    "        statement = \"\"\"\n",
    "            UNLOAD ('SELECT marketplace, customer_id, review_id, product_id, product_parent, \n",
    "                product_title, product_category, star_rating, helpful_votes, total_votes, \n",
    "                vine, verified_purchase, review_headline, review_body, review_date, year \n",
    "            FROM redshift.{}')\n",
    "            TO '{}/{}/'\n",
    "            IAM_ROLE '{}'\n",
    "            PARQUET PARALLEL ON \n",
    "            PARTITION BY (product_category)\n",
    "        \"\"\".format(current_table_name, s3_path, year, iam_role)\n",
    "\n",
    "        wr.data_api.redshift.read_sql_query(\n",
    "            sql=statement,\n",
    "            con=con_redshift,\n",
    "        )\n",
    "    print(\"Done.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# The following `UNLOAD` command can take some time to complete. Please be patient. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "unload_redshift_table(wr, con_redshift, 'amazon_reviews_tsv', 2014, 2015, s3_path_parquet_unload, iam_role)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# List the S3 Directory"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(s3_path_parquet_unload)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 ls $s3_path_parquet_unload/2014/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 ls $s3_path_parquet_unload/2015/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%javascript\n",
    "\n",
    "try {\n",
    "    Jupyter.notebook.save_checkpoint();\n",
    "    Jupyter.notebook.session.delete();\n",
    "}\n",
    "catch(err) {\n",
    "    // NoOp\n",
    "}"
   ]
  }
 ],
 "metadata": {
  "instance_type": "ml.t3.medium",
  "kernelspec": {
   "display_name": "Python 3 (Data Science)",
   "language": "python",
   "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
